msg_tool\scripts\kirikiri\archive\xp3/
writer.rs

1use super::archive::*;
2use super::consts::*;
3use super::reader::*;
4use super::segmenter::*;
5use crate::ext::io::*;
6use crate::ext::mutex::*;
7use crate::scripts::base::*;
8use crate::types::*;
9use crate::utils::encoding::*;
10use crate::utils::threadpool::ThreadPool;
11use anyhow::Result;
12use sha2::{Digest, Sha256};
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::io::{Seek, Write};
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17
18#[derive(Clone)]
19struct WrittenSegment {
20    is_compressed: bool,
21    start: u64,
22    original_size: u64,
23    archived_size: u64,
24}
25
26#[derive(Default)]
27struct Stats {
28    total_original_size: AtomicU64,
29    final_archive_size: AtomicU64,
30    total_segments: AtomicUsize,
31    unique_segments: AtomicUsize,
32    deduplication_savings: AtomicU64,
33}
34
35impl std::fmt::Display for Stats {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        let total_original_size = self
38            .total_original_size
39            .load(std::sync::atomic::Ordering::Relaxed);
40        let final_archive_size = self
41            .final_archive_size
42            .load(std::sync::atomic::Ordering::Relaxed);
43        let total_segments = self
44            .total_segments
45            .load(std::sync::atomic::Ordering::Relaxed);
46        let unique_segments = self
47            .unique_segments
48            .load(std::sync::atomic::Ordering::Relaxed);
49        let deduplication_savings = self
50            .deduplication_savings
51            .load(std::sync::atomic::Ordering::Relaxed);
52        write!(
53            f,
54            "Total Original Size: {} bytes\nFinal Archive Size: {} bytes\nTotal Segments: {}\nUnique Segments: {}\nDeduplication Savings: {} bytes",
55            total_original_size,
56            final_archive_size,
57            total_segments,
58            unique_segments,
59            deduplication_savings
60        )
61    }
62}
63
64pub struct Xp3ArchiveWriter<T: Write + Seek> {
65    file: Arc<Mutex<T>>,
66    segments: Arc<Mutex<HashMap<[u8; 32], WrittenSegment>>>,
67    items: Arc<Mutex<BTreeMap<String, ArchiveItem>>>,
68    runner: ThreadPool<Result<()>>,
69    compress_files: bool,
70    compress_index: bool,
71    zlib_compression_level: u32,
72    segmenter: Option<Arc<Box<dyn Segmenter + Send + Sync>>>,
73    stats: Arc<Stats>,
74    compress_workers: usize,
75    processing_segments: Arc<Mutex<HashSet<[u8; 32]>>>,
76    use_zstd: bool,
77    zstd_compression_level: i32,
78    no_adler: bool,
79    #[cfg(feature = "zopfli")]
80    use_zopfli: bool,
81    #[cfg(feature = "zopfli")]
82    zopfli_iteration_count: std::num::NonZeroU64,
83    #[cfg(feature = "zopfli")]
84    zopfli_iterations_without_improvement: std::num::NonZeroU64,
85    #[cfg(feature = "zopfli")]
86    zopfli_maximum_block_splits: u16,
87}
88
89impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
90    pub fn new(filename: &str, files: &[&str], config: &ExtraConfig) -> Result<Self> {
91        let file = std::fs::File::create(filename)?;
92        let mut file = std::io::BufWriter::new(file);
93        let mut items = BTreeMap::new();
94        for file in files {
95            let item = ArchiveItem {
96                name: file.to_string(),
97                file_hash: 0,
98                original_size: 0,
99                archived_size: 0,
100                segments: Vec::new(),
101            };
102            items.insert(file.to_string(), item);
103        }
104        let segmenter = create_segmenter(&config.xp3_segmenter).map(|s| Arc::new(s));
105        file.write_all(XP3_MAGIC)?;
106        file.write_u64(0)?; // Placeholder for index offset
107        Ok(Self {
108            file: Arc::new(Mutex::new(file)),
109            segments: Arc::new(Mutex::new(HashMap::new())),
110            items: Arc::new(Mutex::new(items)),
111            runner: ThreadPool::new(
112                if config.xp3_segmenter.is_none() {
113                    1
114                } else {
115                    config.xp3_pack_workers.max(1)
116                },
117                Some("xp3-writer"),
118                false,
119            )?,
120            compress_files: config.xp3_compress_files,
121            compress_index: config.xp3_compress_index,
122            zlib_compression_level: config.zlib_compression_level,
123            segmenter,
124            stats: Arc::new(Stats::default()),
125            compress_workers: config.xp3_compress_workers.max(1),
126            processing_segments: Arc::new(Mutex::new(HashSet::new())),
127            use_zstd: config.xp3_zstd,
128            zstd_compression_level: config.zstd_compression_level,
129            no_adler: config.xp3_no_adler,
130            #[cfg(feature = "zopfli")]
131            use_zopfli: config.xp3_zopfli,
132            #[cfg(feature = "zopfli")]
133            zopfli_iteration_count: config.zopfli_iteration_count,
134            #[cfg(feature = "zopfli")]
135            zopfli_iterations_without_improvement: config.zopfli_iterations_without_improvement,
136            #[cfg(feature = "zopfli")]
137            zopfli_maximum_block_splits: config.zopfli_maximum_block_splits,
138        })
139    }
140}
141
142struct Writer<'a> {
143    inner: Box<dyn Write + 'a>,
144    mem: MemWriter,
145}
146
147impl std::fmt::Debug for Writer<'_> {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("Writer").field("mem", &self.mem).finish()
150    }
151}
152
153impl<'a> Write for Writer<'a> {
154    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
155        self.mem.write(buf)
156    }
157
158    fn flush(&mut self) -> std::io::Result<()> {
159        self.mem.flush()
160    }
161}
162
163impl<'a> Seek for Writer<'a> {
164    fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
165        self.mem.seek(pos)
166    }
167
168    fn stream_position(&mut self) -> std::io::Result<u64> {
169        self.mem.stream_position()
170    }
171
172    fn rewind(&mut self) -> std::io::Result<()> {
173        self.mem.rewind()
174    }
175}
176
177impl<'a> Drop for Writer<'a> {
178    fn drop(&mut self) {
179        let _ = self.inner.write_all(&self.mem.data);
180        let _ = self.inner.flush();
181    }
182}
183
184impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
185    fn new_file<'a>(
186        &'a mut self,
187        name: &str,
188        size: Option<u64>,
189    ) -> Result<Box<dyn WriteSeek + 'a>> {
190        let inner = self.new_file_non_seek(name, size)?;
191        Ok(Box::new(Writer {
192            inner,
193            mem: MemWriter::new(),
194        }))
195    }
196
197    fn new_file_non_seek<'a>(
198        &'a mut self,
199        name: &str,
200        _size: Option<u64>,
201    ) -> Result<Box<dyn Write + 'a>> {
202        if self.segmenter.is_none() {
203            self.runner.join();
204        }
205        for err in self.runner.take_results() {
206            err?;
207        }
208        let item = {
209            let items = self.items.lock_blocking();
210            Arc::new(Mutex::new(
211                items
212                    .get(name)
213                    .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
214                    .clone(),
215            ))
216        };
217        let (reader, writer) = std::io::pipe()?;
218        let reader = Reader::new(reader);
219        {
220            let file = self.file.clone();
221            let segments = self.segments.clone();
222            let items = self.items.clone();
223            let segmenter = self.segmenter.clone();
224            let stats = self.stats.clone();
225            let is_compressed = self.compress_files;
226            let zlib_compression_level = self.zlib_compression_level;
227            let workers = if self.segmenter.is_some() && is_compressed {
228                Some(Arc::new(ThreadPool::<Result<()>>::new(
229                    self.compress_workers,
230                    Some("xp3-compress"),
231                    false,
232                )?))
233            } else {
234                None
235            };
236            let processiong_segments = self.processing_segments.clone();
237            let use_zstd = self.use_zstd;
238            #[cfg(feature = "zopfli")]
239            let use_zopfli = self.use_zopfli;
240            #[cfg(feature = "zopfli")]
241            let zopfli_iteration_count = self.zopfli_iteration_count;
242            #[cfg(feature = "zopfli")]
243            let zopfli_iterations_without_improvement = self.zopfli_iterations_without_improvement;
244            #[cfg(feature = "zopfli")]
245            let zopfli_maximum_block_splits = self.zopfli_maximum_block_splits;
246            let zstd_compression_level = self.zstd_compression_level;
247            let name = name.to_owned();
248            self.runner.execute(
249                move |_| {
250                    let mut reader = reader;
251                    let mut offset_in_file = 0u64;
252                    if let Some(segmenter) = segmenter {
253                        for seg in segmenter.segment(&mut reader, &name) {
254                            let seg = seg?;
255                            let hash: [u8; 32] = Sha256::digest(&seg).into();
256                            let seg_offset_in_file = offset_in_file;
257                            offset_in_file += seg.len() as u64;
258                            let fseg = match {
259                                let mut segments = segments.lock_blocking();
260                                if let Some(old_seg) = segments.get(&hash) {
261                                    Err(old_seg.clone())
262                                } else {
263                                    let seg_data = WrittenSegment {
264                                        is_compressed,
265                                        start: 0,
266                                        original_size: seg.len() as u64,
267                                        archived_size: seg.len() as u64,
268                                    };
269                                    segments.insert(hash, seg_data.clone());
270                                    Ok(seg_data)
271                                }
272                            } {
273                                Ok(mut info) => {
274                                    if let Some(workers) = workers.as_ref() {
275                                        {
276                                            let mut processing =
277                                                processiong_segments.lock_blocking();
278                                            processing.insert(hash);
279                                        }
280                                        let file = file.clone();
281                                        let segments = segments.clone();
282                                        let stats = stats.clone();
283                                        let item = item.clone();
284                                        let processiong_segments = processiong_segments.clone();
285                                        workers.execute(
286                                            move |_| {
287                                                let data = {
288                                                    if use_zopfli {
289                                                        let option = zopfli::Options {
290                                                            iteration_count: zopfli_iteration_count,
291                                                            iterations_without_improvement:
292                                                                zopfli_iterations_without_improvement,
293                                                            maximum_block_splits:
294                                                                zopfli_maximum_block_splits,
295                                                        };
296                                                        let mut e = zopfli::ZlibEncoder::new(option, zopfli::BlockType::Dynamic, Vec::new())?;
297                                                        e.write_all(&seg)?;
298                                                        e.finish()?
299                                                    } else if use_zstd {
300                                                        let mut e = zstd::stream::Encoder::new(
301                                                            Vec::new(),
302                                                            zstd_compression_level,
303                                                        )?;
304                                                        e.write_all(&seg)?;
305                                                        e.finish()?
306                                                    } else {
307                                                        let mut e = flate2::write::ZlibEncoder::new(
308                                                            Vec::new(),
309                                                            flate2::Compression::new(
310                                                                zlib_compression_level,
311                                                            ),
312                                                        );
313                                                        e.write_all(&seg)?;
314                                                        e.finish()?
315                                                    }
316                                                };
317                                                let mut file = file.lock_blocking();
318                                                let start = file.seek(std::io::SeekFrom::End(0))?;
319                                                file.write_all(&data)?;
320                                                info.start = start;
321                                                info.archived_size = data.len() as u64;
322                                                let stats = stats.clone();
323                                                stats.total_original_size.fetch_add(
324                                                    info.original_size,
325                                                    Ordering::Relaxed,
326                                                );
327                                                stats.final_archive_size.fetch_add(
328                                                    info.archived_size,
329                                                    Ordering::Relaxed,
330                                                );
331                                                stats
332                                                    .total_segments
333                                                    .fetch_add(1, Ordering::Relaxed);
334                                                stats
335                                                    .unique_segments
336                                                    .fetch_add(1, Ordering::Relaxed);
337                                                let mut segments = segments.lock_blocking();
338                                                segments.insert(hash, info.clone());
339                                                let ninfo = Segment {
340                                                    is_compressed: info.is_compressed,
341                                                    start: info.start,
342                                                    offset_in_file: seg_offset_in_file,
343                                                    original_size: info.original_size,
344                                                    archived_size: info.archived_size,
345                                                };
346                                                let mut item = item.lock_blocking();
347                                                item.original_size += ninfo.original_size;
348                                                item.archived_size += ninfo.archived_size;
349                                                item.segments.push(ninfo);
350                                                let mut processing =
351                                                    processiong_segments.lock_blocking();
352                                                processing.remove(&hash);
353                                                Ok(())
354                                            },
355                                            true,
356                                        )?;
357                                        None
358                                    } else {
359                                        {
360                                            let mut processing =
361                                                processiong_segments.lock_blocking();
362                                            processing.insert(hash);
363                                        }
364                                        let data = seg;
365                                        let mut file = file.lock_blocking();
366                                        let start = file.seek(std::io::SeekFrom::End(0))?;
367                                        file.write_all(&data)?;
368                                        info.start = start;
369                                        info.archived_size = data.len() as u64;
370                                        let stats = stats.clone();
371                                        stats
372                                            .total_original_size
373                                            .fetch_add(info.original_size, Ordering::Relaxed);
374                                        stats
375                                            .final_archive_size
376                                            .fetch_add(info.archived_size, Ordering::Relaxed);
377                                        stats.total_segments.fetch_add(1, Ordering::Relaxed);
378                                        stats.unique_segments.fetch_add(1, Ordering::Relaxed);
379                                        let mut segments = segments.lock_blocking();
380                                        segments.insert(hash, info.clone());
381                                        let ninfo = Segment {
382                                            is_compressed: info.is_compressed,
383                                            start: info.start,
384                                            offset_in_file: seg_offset_in_file,
385                                            original_size: info.original_size,
386                                            archived_size: info.archived_size,
387                                        };
388                                        {
389                                            let mut processing =
390                                                processiong_segments.lock_blocking();
391                                            processing.remove(&hash);
392                                        }
393                                        Some(ninfo)
394                                    }
395                                }
396                                Err(mut seg_info) => {
397                                    let mut need_update = false;
398                                    loop {
399                                        if {
400                                            let processing = processiong_segments.lock_blocking();
401                                            !processing.contains(&hash)
402                                        } {
403                                            break;
404                                        }
405                                        need_update = true;
406                                        std::thread::sleep(std::time::Duration::from_millis(10));
407                                    }
408                                    if need_update {
409                                        seg_info = {
410                                            let segments = segments.lock_blocking();
411                                            segments
412                                                .get(&hash)
413                                                .ok_or(anyhow::anyhow!(
414                                                    "Failed to get latest segment info."
415                                                ))?
416                                                .clone()
417                                        };
418                                    }
419                                    let stats = stats.clone();
420                                    stats
421                                        .total_original_size
422                                        .fetch_add(seg_info.original_size, Ordering::Relaxed);
423                                    stats
424                                        .deduplication_savings
425                                        .fetch_add(seg_info.archived_size, Ordering::Relaxed);
426                                    stats.total_segments.fetch_add(1, Ordering::Relaxed);
427                                    let ninfo = Segment {
428                                        is_compressed: seg_info.is_compressed,
429                                        start: seg_info.start,
430                                        offset_in_file: seg_offset_in_file,
431                                        original_size: seg_info.original_size,
432                                        archived_size: seg_info.archived_size,
433                                    };
434                                    Some(ninfo)
435                                }
436                            };
437                            if let Some(fseg) = fseg {
438                                let mut item = item.lock_blocking();
439                                item.original_size += fseg.original_size;
440                                item.archived_size += fseg.archived_size;
441                                item.segments.push(fseg);
442                            }
443                        }
444                    } else {
445                        let mut file = file.lock_blocking();
446                        let start = file.seek(std::io::SeekFrom::End(0))?;
447                        let size = {
448                            let mut writer = if is_compressed {
449                                if use_zopfli {
450                                    let e = zopfli::ZlibEncoder::new(
451                                        zopfli::Options {
452                                            iteration_count: zopfli_iteration_count,
453                                            iterations_without_improvement:
454                                                zopfli_iterations_without_improvement,
455                                            maximum_block_splits: zopfli_maximum_block_splits,
456                                        },
457                                        zopfli::BlockType::Dynamic,
458                                        &mut *file,
459                                    )?;
460                                    Box::new(e) as Box<dyn Write>
461                                } else if use_zstd {
462                                    let e = zstd::stream::Encoder::new(
463                                        &mut *file,
464                                        zstd_compression_level,
465                                    )?.auto_finish();
466                                    Box::new(e) as Box<dyn Write>
467                                } else {
468                                    let e = flate2::write::ZlibEncoder::new(
469                                        &mut *file,
470                                        flate2::Compression::new(zlib_compression_level),
471                                    );
472                                    Box::new(e) as Box<dyn Write>
473                                }
474                            } else {
475                                Box::new(&mut *file) as Box<dyn Write>
476                            };
477                            std::io::copy(&mut reader, &mut writer)?
478                        };
479                        let ninfo = Segment {
480                            is_compressed,
481                            start,
482                            offset_in_file: 0,
483                            original_size: size,
484                            archived_size: if is_compressed {
485                                file.stream_position()? - start
486                            } else {
487                                size
488                            },
489                        };
490                        let mut item = item.lock_blocking();
491                        item.original_size += ninfo.original_size;
492                        item.archived_size += ninfo.archived_size;
493                        let stats = stats.clone();
494                        stats
495                            .total_original_size
496                            .fetch_add(ninfo.original_size, Ordering::Relaxed);
497                        stats
498                            .final_archive_size
499                            .fetch_add(ninfo.archived_size, Ordering::Relaxed);
500                        stats.total_segments.fetch_add(1, Ordering::Relaxed);
501                        stats.unique_segments.fetch_add(1, Ordering::Relaxed);
502                        item.segments.push(ninfo);
503                    }
504                    if let Some(workers) = workers {
505                        workers.join();
506                        for err in workers.take_results() {
507                            err?;
508                        }
509                    }
510                    let mut item = item.lock_blocking().to_owned();
511                    item.file_hash = reader.into_checksum();
512                    item.segments.sort_by_key(|s| s.offset_in_file);
513                    let mut items = items.lock_blocking();
514                    items.insert(item.name.clone(), item);
515                    Ok(())
516                },
517                true,
518            )?;
519        }
520        Ok(Box::new(writer))
521    }
522
523    fn write_header(&mut self) -> Result<()> {
524        self.runner.join();
525        for err in self.runner.take_results() {
526            err?;
527        }
528        let mut file = self.file.lock_blocking();
529        let index_offset = file.seek(std::io::SeekFrom::End(0))?;
530        let mut index_data = MemWriter::new();
531        let items = self.items.lock_blocking();
532        for (_, item) in items.iter() {
533            let mut file_chunk = MemWriter::new();
534            let name = encode_string(Encoding::Utf16LE, &item.name, false)?;
535            let info_data_size = name.len() as u64 + 22;
536            file_chunk.write_all(CHUNK_INFO)?;
537            file_chunk.write_u64(info_data_size)?;
538            file_chunk.write_u32(0)?; // flags
539            file_chunk.write_u64(item.original_size)?;
540            file_chunk.write_u64(item.archived_size)?;
541            file_chunk.write_u16(name.len() as u16 / 2)?;
542            file_chunk.write_all(&name)?;
543            let segm_data_size = item.segments.len() as u64 * 28;
544            file_chunk.write_all(CHUNK_SEGM)?;
545            file_chunk.write_u64(segm_data_size)?;
546            for seg in &item.segments {
547                let flag = if seg.is_compressed {
548                    TVP_XP3_SEGM_ENCODE_ZLIB
549                } else {
550                    TVP_XP3_SEGM_ENCODE_RAW
551                };
552                file_chunk.write_u32(flag)?;
553                file_chunk.write_u64(seg.start)?;
554                file_chunk.write_u64(seg.original_size)?;
555                file_chunk.write_u64(seg.archived_size)?;
556            }
557            let adlr_data_size = 4;
558            file_chunk.write_all(CHUNK_ADLR)?;
559            file_chunk.write_u64(adlr_data_size)?;
560            if self.no_adler {
561                file_chunk.write_u32(0)?;
562            } else {
563                file_chunk.write_u32(item.file_hash)?;
564            }
565            index_data.write_all(CHUNK_FILE)?;
566            let file_chunk = file_chunk.into_inner();
567            index_data.write_u64(file_chunk.len() as u64)?;
568            index_data.write_all(&file_chunk)?;
569        }
570        let index_data = index_data.into_inner();
571        if self.compress_index {
572            let compressed_index = if self.use_zopfli {
573                let option = zopfli::Options {
574                    iteration_count: self.zopfli_iteration_count,
575                    iterations_without_improvement: self.zopfli_iterations_without_improvement,
576                    maximum_block_splits: self.zopfli_maximum_block_splits,
577                };
578                let mut e =
579                    zopfli::ZlibEncoder::new(option, zopfli::BlockType::Dynamic, Vec::new())?;
580                e.write_all(&index_data)?;
581                e.finish()?
582            } else if self.use_zstd {
583                let mut e = zstd::stream::Encoder::new(Vec::new(), self.zstd_compression_level)?;
584                e.write_all(&index_data)?;
585                e.finish()?
586            } else {
587                let mut e = flate2::write::ZlibEncoder::new(
588                    Vec::new(),
589                    flate2::Compression::new(self.zlib_compression_level),
590                );
591                e.write_all(&index_data)?;
592                e.finish()?
593            };
594            file.write_u8(TVP_XP3_INDEX_ENCODE_ZLIB)?;
595            file.write_u64(compressed_index.len() as u64)?;
596            file.write_u64(index_data.len() as u64)?;
597            file.write_all(&compressed_index)?;
598        } else {
599            file.write_u8(TVP_XP3_INDEX_ENCODE_RAW)?;
600            file.write_u64(index_data.len() as u64)?;
601            file.write_all(&index_data)?;
602        }
603        file.write_u64_at(11, index_offset)?; // Write index offset to header
604        file.flush()?;
605        eprintln!("XP3 Archive Statistics:\n{}", self.stats);
606        Ok(())
607    }
608}